Java 并发(四) - 深入理解同步实现

Posted by LinYaoTian on 2018-09-19

构建自定义的同步工具

内置的条件队列

条件队列就如同烤面包机上的面包已好的铃声。如果你正在听着它, 当面包烤好后你可以立即注意到, 并且放下手头的事情开始品尝面包, 如果你没有听见它, 你会错过通知消息, 但是回到厨房后还是看到面包的状态, 如果已经烤完, 就取面包, 如果未烤完, 就再次监听铃声。

条件队列中的元素是一个个正在等待相关条件的线程;每一个对象都可以作为一个条件队列,并且 ObjectwaitnotifynotifyAll构成了内部条件队列的 API。

  • waitwait是等待的意思,调用wait会自动释放锁,并请求系统挂起当前线程,从而使其他线程能够获得这个锁 。
  • notify:发出通知,解除阻塞条件,JVM会从这个条件队列上等待的多个线程选择一个来唤醒
  • notifyAll:发出通知,解除阻塞条件,JVM会唤醒所有在这个条件队列上等待的线程。
  • 条件谓语:线程等待的条件

条件等待中存在一个很重要的三元关系:synchronizedwait和一个条件谓语。 条件变量由一个锁保护,检查条件谓语时必须先持有锁,调用waitnotifyAll所在方法的对象必须是同一个对象。

用下面这个代码来解析:

1
2
3
4
5
6
7
8
public synchronized V take() throws InterruptedException{
while(isEmpty()){
wait();
}
V v = get();
notifyAll();
return v;
}

在这块代码中,isEmpty()就是take()方法的条件谓语,A线程如果判断为空,将会调用wait,阻塞A线程,直到其它线程B操作使isEmpty()false,接着B调用notifyAll,释放锁后,唤醒线程A

notifynotifyAll的区别:大多数情况下,应该优先选择notifyAll

原因:假如线程A在条件队列上等待条件谓语PA,线程B在同一个条件队列上等待条件谓语PB,假如线程CPB变为真,且调用notifyJVM将从众多的等待线程选择其中A来唤醒,但是A看到PA仍然为false,于是继续等待,然而线程B本可以开始执行,却没有被唤醒。

只有同时满足以下两个条件时,才能用单一的 notify 而不是 notifyAll:

  • 所有等待线程的类型都相同:只有一个条件谓词与条件队列相关,并且每个线程在从 wait 返回后将执行相同的操作。
  • 单进单出:在条件变量上的每次通知,最多只能唤醒一个线程来执行。

如果有10个线程在条件队列中等待, 调用 notifyAll 会唤醒每一个线程, 让它们去竞争锁, 然后它们中的大多数或者全部又回到休眠状态, 这意味着每一个激活单一线程执行的事件, 都会带来大量的上下文切换, 和大量竞争锁的请求。

Condition 对象

内置的条件队列有一些缺陷,每一个内置锁都只能由一个相关联的条件队列。如果想要编写一个带有多个条件谓语的并发对象,可以使用LockCondition

一个Condition和一个单独的Lock相关联, 调用Lock.newCondition()方法, 可以创建一个Condition。每个Lock可以有任意数量的Condition对象。wait, notify, notifyAllCondition中都有对应的:await, signal, signalAll, 而且一定要使用后者!

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ConditionBoundedBuffer<T> {  
private static final int BUFFER_SIZE = 2;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();//获取 Condition 对象
private final Condition notEmpty = lock.newCondition();
private final T[] items = (T[]) new Object[BUFFER_SIZE];
private int tail, head, count;

public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await(); //
}
items[tail] = x;
if (++tail == items.length) {
tail = 0;
}
count++;
notEmpty.signal(); //
} finally {
lock.unlock();
}
}

public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();//
}
T x = items[head];
items[head] = null;
if (++head == items.length) {
head = 0;
}
count--;
notFull.signal(); //
return x;
} finally {
lock.unlock();
}
}
}

AbstractQueuedSynchronier (AQS)

同步的实现都是基于AbstractQueuedSynchronizerAQS),AQS是一个用于构建锁和同步器的框架,例如ReentrantLockSemaphoreCountDownLatch等都是基于AQS构建的。

AQS构建的容器中,最基本的就是获取操作和释放操作,对于CountDownLatch,获取意味着等待并直到闭锁到达结束状态,对于FutureTask,获取意味着等待直到任务已经完成。

AQS负责同步容器类中的状态,它管理了一个整数状态信息,可以通过getStatesetState以及compareAndSetState来设置和获取。例如ReentrantLock用它来表示线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始,正在运行,已完成以及以取消)

AQS获取操作可能是独占的, 就像ReentrantLock一样, 也可能是非独占的, 就像SemaphoreCountDownLatch一样, 这取决于不同的Synchronizer

下面使用AQS来实现一个闭锁:(事实上,同步容器就是这样做的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class OneShotLatch {  
private final Sync sync = new Sync();

private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared(int arg) {
return (getState() == 1) ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}
}

public void signal() {
sync.releaseShared(0);
}

public void await() throws InterruptedException {
sync.acqurieSharedInterruptibly(0);
}
}

代码中通过AQS管理闭锁的状态:关闭0, 打开1。await方法调用AQSacqurieSharedInterruptibly, 后者随后请求OneShotLatch中的tryAcquireShared方法必须返回一个值来表明操作能否继续执行。如果闭锁已经事先打开, tryAcquireShared会返回成功, 并允许线程通过;否则他会返回一个值, 表明获取请求的尝试失败。acquireSharedInterruptibly方法处理失败的方式, 是把线程植入一个队列中, 该队列中的元素都是等待中的线程, signal调用releaseShared, 进而导致tryReleaseShared被调用。tryReleasseShared的实现无条件地把闭锁的状态设置为打开, 通过返回值表明Synchronizer处于完全释放的状态。

java.util.concurrent 同步类中的 AQS

java.util.concurrent同步类都没有直接扩展AQS,而是将他们的功能委托给AQS子类实现。
java.util.concurrent中基于AQS开发的类有:ReentrantLockSemaphoreReentrantReadWriteLockCountDownLatchSynchronousQueueFutureTask等。

原子变量与非阻塞同步机制

非阻塞算法被广泛用于操作系统和JVM中实现线程/进程调度机制,垃圾回收机制以及锁和其它并发数据结构。

比较并交换(CAS)

概念:

  1. 比较并交换有三个操作数: 内存位置V,进行比较的值A,新值B
  2. 当且仅当V的值等于旧值A时, CAS才会用新值B原子化地更新V的值, 否则它什么都不会做。
  3. 无论位置V的值是否等于A,CAS都会返回V的真实值。CAS的意思是: 我认为V的值应该是A, 如果是, 那么将B值赋值给V, 若不是, 则不修改, 并告诉我V的旧值为多少。
  4. CAS是一项乐观技术: 它抱着成功的希望进行更新, 并且如果另一个线程在上次检查后更新了该变量, 它能够发现错误 。
  5. 当多个线程试图使用CAS同时更新相同的变量时, 只有一个线程会更新变量的值,而其他的都会失败。然而,失败的线程不会被挂起, 他们会被告知这次赛跑失利, 但是允许重试。由于一个线程不会竞争CAS时不会被阻塞,因此它可以决定是否重试。

CAS 需要由硬件辅助实现。

CAS的典型使用模式: 首先从V中读取值A,根据A计算值B,然后通过CAS以原子操作将V的值A变为B

一个模拟的CAS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SimulateCAS {  
private volatile int value;

public int get() {
return value;
}

public int compareAndSwap(int expectedValue, int newValue) {
int oldValue = value;
if (expectedValue == oldValue) {
value = newValue;
}
return oldValue; //无论能否修改,都返回旧值
}

public boolean compareAndSet(int expectedValue, int newValue) {
return (expectedValue == compareAndSwap(expectedValue, newValue));
}
}

基于CAS实现的线程安全的计数器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CasCounter {  
private SimulateCAS value = new SimulateCAS();

public int getValue() {
return value.get();
}

public int increment() {
int v;
do {
v = value.get();
} while (v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}

非阻塞算法

如果线程在持有锁时由于阻塞IO,内存页缺失或其他延迟导致推迟执行,那么很可能所有的线程都不能继续执行下去。

  • 非阻塞算法:如果在某种算法中,一个线程的失败或者挂起不会导致其他线程也失败或挂起,那么这种算法就被称为非阻塞算法。
  • 无锁算法:如果在算法的每一个步骤都存在某个线程能够执行下去,那么这种算法称为无锁算法(Lock-Free)。
  • 无阻塞,无锁算法:如果算法中仅将CAS用于协调线程之间的操作,并且能够正确地实现,那么它是一种无阻塞,无锁算法。

上面基于CAS实现的线程安全的计数器就是无阻塞算法实现

基于非阻塞的Stack实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ConcurrentStack<E> {  
class Node<E> {
E item;
Node<E> next;
public Node(E item) {
this.item = item;
}

}

AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); // 对栈顶的一个引用

public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}

public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
}

基于非阻塞的链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class LinkedQueue<E> {  
static class Node<E> {

final E item;
final AtomicReference<Node<E>> next;

public Node() {
this(null, null);
}

public Node(E item, Node<E> next) {
this.item = item;
this.next = new AtomicReference<Node<E>>(next);
}
}

private final Node<E> dummy = new Node<E>();
private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy);
private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy);

public boolean put(E item) {
Node<E> newNode = new Node<E>(item, null);
while (true) {
Node<E> curTailNode = tail.get();
Node<E> tailNextNode = curTailNode.next.get();
if (curTailNode == tail.get()) {
if (tailNextNode == null) {
// 更新尾节点下一个节点
if (curTailNode.next.compareAndSet(null, newNode)) {
// 更新成功, 将尾节点指向下一个节点
tail.compareAndSet(curTailNode, newNode);
return true;
}
} else {
// 在更新过程中, 发现尾节点的下一个节点被更新了, 将尾节点指向下一个节点
tail.compareAndSet(curTailNode, tailNextNode);
}
}
}
}

public static void main(String[] args) {
final LinkedQueue<String> queue = new LinkedQueue<String>();
new Thread(new Runnable() {
@Override
public void run() {
queue.put("item1");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
queue.put("item2");
}
}).start();
}
}